Change Data Capture(CDC)详解:原理、工具对比与 Flink CDC 3.0 深度解析

发布于 2026-06-05

目录

什么是 CDC?

CDC(Change Data Capture) 即变更数据捕获,是一种通过监听数据库事务日志(而非轮询查询),实时捕获数据变更事件并传播到下游系统的技术。它是现代数据集成的核心手段,尤其适合构建实时数仓、数据湖和事件驱动架构。

CDC 的三种实现方式

方式原理优缺点
基于日志(Log-based)读取数据库的事务日志(Binlog/WAL/Redo Log)性能最优,零侵入,推荐方案
基于触发器(Trigger-based)数据库触发器写入影子表侵入性强,影响源库性能
基于轮询(Polling/Query-based)定时 SELECT 变更(timestamp 列)实现简单,但有延迟、漏变更风险

当前主流方案几乎都采用 基于日志 的方式。

主流 CDC 工具对比

工具生态特点
DebeziumKafka Connect 生态开源标杆,支持 MySQL/PG/Mongo/Oracle/SQL Server
Canal阿里开源专注 MySQL Binlog,轻量高效
Flink CDCApache Flink流式 ETL 一体化,支持全增量一体读取
Maxwell独立进程MySQL 专用,输出 JSON 到 Kafka
Oracle GoldenGateOracle 商业企业级,支持异构数据库复制
阿里云 DTS / 腾讯 DTS云服务托管式,开箱即用,支持多种源目标

典型应用场景

  1. 实时数仓同步 — 业务库 → Kafka → ClickHouse/StarRocks,实现秒级数据可见性
  2. 数据湖入湖 — CDC → Flink → Iceberg/Hudi,增量入湖替代全量导入
  3. 缓存失效 — 数据库变更实时同步到 Redis,保证缓存一致性
  4. 搜索索引构建 — 业务表变更实时写入 Elasticsearch
  5. 微服务事件发布 — 将数据库变更转为领域事件,驱动下游服务
  6. 异地多活 / 灾备 — 跨机房、跨地域的数据实时复制

关键设计考量

底层实现

alt text

CDC 基于日志的底层实现细节

核心原理:伪装复制协议

以 MySQL 为例,CDC 连接器(如 Debezium/Canal)的工作方式:

CDC Connector  →  向 MySQL 发送 COM_REGISTER_SLAVE 命令
               →  伪装为一个 MySQL Slave 节点
               →  MySQL Master 认为它是从库,主动推送 Binlog 事件流
               →  Connector 解析每个 Binlog Event,转为标准 CDC 事件

不同数据库的日志机制:

数据库日志类型捕获方式
MySQLBinlog (ROW 格式)伪装 Slave,通过 dump 协议接收
PostgreSQLWAL (Write-Ahead Log)逻辑复制槽 (Logical Replication Slot)
OracleRedo Log + LogMiner通过 LogMiner API 解析归档日志
SQL ServerTransaction LogCDC 功能内置,读取 cdc 系统表
MongoDBOplog / Change Stream通过 Change Stream API 订阅

Binlog ROW 格式事件解析

一条 UPDATE users SET name=‘Bob’ WHERE id=1 在 Binlog 中生成的事件序列:

┌─────────────┐    ┌─────────────┐    ┌──────────────┐    ┌──────────────────┐    ┌───────────┐
│ GTID Event  │ →  │ Query:BEGIN │ →  │ TABLE_MAP    │ →  │ UPDATE_ROWS      │ →  │ XID:COMMIT│
│ txn_id=uuid │    │             │    │ table_id=85  │    │ before: id=1,    │    │           │
│             │    │             │    │ schema=mydb  │    │   name='Alice'   │    │           │
│             │    │             │    │ table=users  │    │ after:  id=1,    │    │           │
│             │    │             │    │ col_types=[..│    │   name='Bob'     │    │           │
└─────────────┘    └─────────────┘    └──────────────┘    └──────────────────┘    └───────────┘

关键要点:


全量 + 增量一体化(Snapshot + Streaming)

传统方案(Debezium 默认)

对 users 表(100万行),按主键拆分 Chunk:
  Chunk-1: id ∈ [1, 10000]
  Chunk-2: id ∈ [10001, 20000]
  ...

每个 Chunk 的处理流程:
  1. 记录 Low Watermark(当前 Binlog Position)
  2. SELECT * FROM users WHERE id BETWEEN 1 AND 10000
  3. 记录 High Watermark(当前 Binlog Position)
  4. 读取 [Low, High] 之间的 Binlog 变更
  5. 如果 Binlog 中有对该 Chunk 内行的修改 → 用 Binlog 数据覆盖 SELECT 结果
  6. 输出修正后的 Chunk 数据

为什么这样不丢数据? 因为任何在 SELECT 执行期间发生的并发写入,一定会出现在 [Low, High] 区间的 Binlog 中,用它来 “修正” SELECT 结果就等于拿到了一个一致性快照。

Exactly-Once 语义的实现

alt text

Schema 变更(DDL)处理

这是 CDC 最棘手的问题之一。当源端执行 ALTER TABLE ADD COLUMN 时:

问题本质

时间线:
  t1: CDC 正在处理 Binlog 事件(schema v1: 3列)
  t2: 源库执行 ALTER TABLE ADD COLUMN age INT
  t3: 后续 Binlog 事件变成 4 列 → 解析器用 v1 schema 解析 v2 数据 = 崩溃

解决方案(以 Debezium 为例)

1. Schema History Topic(Kafka 内部 Topic)
   ┌────────────────────────────────────────────┐
   │ 记录每次 DDL 变更时的完整 schema + 对应 Binlog 位点  │
   │ {"pos": "bin.003:8192", "ddl": "ALTER...",     │
   │  "schema": {"columns": [...], "pk": "id"}}    │
   └────────────────────────────────────────────┘

2. 处理流程:
   ① CDC 引擎检测到 DDL Event(Query Event 中包含 ALTER/CREATE/DROP)
   ② 将新 schema 写入 Schema History Topic
   ③ 内存中切换 schema 版本
   ④ 后续行事件用新 schema 解析

3. 故障恢复时:
   ① 从 Schema History Topic 重放所有 schema 变更
   ② 根据当前 Binlog 位点确定应该使用哪个版本的 schema
   ③ 继续正常解析

各工具的 DDL 处理能力

场景DebeziumFlink CDCCanal
ADD COLUMN自动适配自动适配自动适配
DROP COLUMN输出 null需重启任务可能异常
RENAME TABLE中断,需重配需重启支持
改列类型部分支持需注意下游兼容部分支持

大事务 / 长事务处理

问题

假设源库执行: UPDATE orders SET status='done' WHERE date < '2024-01-01'
影响 500 万行 → 单个事务在 Binlog 中生成 ~2GB 的事件

CDC 引擎需要:
  ① 在内存中缓存整个事务(等 COMMIT 才能发出)
  ② 如果事务回滚了,所有缓存作废
  → 内存溢出风险、延迟激增

解决策略

策略实现方式适用场景
磁盘缓冲事务数据溢写到磁盘(Debezium 的 transaction.buffer)大事务不可避免时
分批提交业务侧改为 LIMIT 分批 UPDATE可控制源端时
事务边界流式发送不等 COMMIT,边收边发,标记事务 IDFlink CDC 部分支持
内存限制 + 告警设置缓冲上限,超过则跳过或报错保护下游稳定

Debezium 具体配置:

# 大事务缓冲策略
transaction.topic=dbserver1.transaction
transaction.buffer.size=1048576000  # 1GB 磁盘缓冲
max.batch.size=2048                 # 每批最大条数
max.queue.size=8192                 # 内部队列大小

Flink CDC

2.x 到 3.0 的核心区别

alt text

alt text

维度Flink CDC 2.xFlink CDC 3.0
定位Source Connector(一个 Flink 数据源)数据集成框架(独立 Pipeline 引擎)
使用方式写 Flink SQL 或 Java DataStream 代码YAML 声明式配置,无需写代码
同步粒度单表 → 单 Job整库同步,一个 Pipeline 搞定上百张表
Schema 变更需要用户自行处理内置 Schema Evolution 引擎,自动传播 DDL
数据转换外部 Flink 算子处理内置 Transform 引擎(列裁剪、计算列、过滤等)
表路由内置 Route 引擎(多表合并、重命名)

Pipeline 抽象模型

3.0 引入了四个核心抽象层:

┌─────────────────────────────────────────────────────────────┐
│                    Pipeline Definition (YAML)                 │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Source  →→  Transform  →→  Route  →→  Sink                │
│  (整库)     (列运算)      (表路由)    (目标库)              │
│                                                             │
│  底层承载:                                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │         Event 统一事件模型 (Data + Schema + DDL)       │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │         Pipeline Composer (编排引擎)                   │   │
│  │         → 解析 YAML → 构建 Flink Job Graph            │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

统一事件模型(这是最关键的设计)

2.x 的 CDC Source 只输出 RowData(行数据),DDL 变更根本不在事件流里。

3.0 重新设计了事件类型体系:

// CDC 3.0 的事件抽象
interface Event {
    // 所有事件的公共接口
}

// 数据变更事件(INSERT/UPDATE/DELETE)
class DataChangeEvent implements Event {
    TableId tableId;          // 来自哪张表
    RecordData before;        // 变更前的行(UPDATE/DELETE 有值)
    RecordData after;         // 变更后的行(INSERT/UPDATE 有值)
    OperationType op;         // INSERT / UPDATE / DELETE
}

// Schema 变更事件(DDL)—— 2.x 里完全没有!
class SchemaChangeEvent implements Event {
    TableId tableId;
    // 具体子类型:
    //   AddColumnEvent     → ALTER TABLE ADD COLUMN
    //   DropColumnEvent    → ALTER TABLE DROP COLUMN
    //   RenameColumnEvent  → ALTER TABLE RENAME COLUMN
    //   AlterColumnTypeEvent → ALTER TABLE MODIFY COLUMN
    //   CreateTableEvent   → CREATE TABLE
    //   DropTableEvent     → DROP TABLE
}

核心思想:DDL 和 DML 是同一条流中的事件,它们按顺序经过整条 Pipeline,保证了:

Schema Evolution 引擎的实现

这是 3.0 最复杂也最有价值的模块:

Source 端检测到 DDL Event(如 ALTER TABLE users ADD COLUMN age INT)


┌─────────────────────────────────────────────────────────┐
│  Schema Evolution Handler                                │
│                                                         │
│  1. 拦截 SchemaChangeEvent                               │
│  2. 查询当前 Sink 端的 Schema(已有哪些列?)              │
│  3. 根据配置策略决定如何处理:                               │
│     ┌────────────────────────────────────────────────┐  │
│     │ 策略 A:EVOLVE(默认)                           │  │
│     │   → 自动生成目标端的 DDL 并执行                    │  │
│     │   → MySQL ADD COLUMN → StarRocks ADD COLUMN     │  │
│     │                                                │  │
│     │ 策略 B:IGNORE                                  │  │
│     │   → 忽略这个 DDL,后续新列数据填 null             │  │
│     │                                                │  │
│     │ 策略 C:EXCEPTION                               │  │
│     │   → 抛出异常,暂停 Pipeline                      │  │
│     └────────────────────────────────────────────────┘  │
│  4. 更新内存中的 Schema Registry(版本化存储)             │
│  5. 继续处理后续 DataChangeEvent(用新 Schema 解析)      │
└─────────────────────────────────────────────────────────┘

Schema 版本化存储的实现细节:

SchemaManager 内部维护一个 TreeMap<Long, Schema>
  Key = Binlog Position / LSN / GTID
  Value = 该位点对应的完整 Schema

当处理 DataChangeEvent 时:
  1. 取出事件的 Position
  2. 在 TreeMap 中找到 <= 该 Position 的最新 Schema
  3. 用该 Schema 反序列化行数据
  → 保证即使乱序或重放,也能用正确版本的 Schema 解析

Transform 引擎

内置表达式引擎,支持在 YAML 里声明列级别的转换:

transform:
  - source-table: mydb.orders
    projection: id, user_id, amount, amount * 0.9 AS discounted  # 计算列
    filter: amount > 100                                          # 过滤条件
    primary-keys: id
    partition-keys: user_id
    table-options:                                                # 目标表选项
      bucket: 4

底层实现是将表达式编译为 Flink 内部的 RowData 投影算子,而不是解释执行,性能和手写 Flink SQL 一样。

Route 引擎(表路由 + 分库分表合并)

解决的问题:源库有 orders_00 ~ orders_99(100 张分表),目标端要合并成一张 orders:

route:
  - source-table: mydb.orders_\d+    # 正则匹配分表
    sink-table: analytics.orders     # 合并到一张目标表
    description: 分表合并路由
    
  - source-table: mydb.users
    sink-table: dw.dim_users         # 重命名

实现机制:

DataChangeEvent(tableId = "mydb.orders_05", ...)

       ▼ Route Engine 匹配规则

DataChangeEvent(tableId = "analytics.orders", ...)  ← tableId 被改写

       ▼ 发给 Sink

整库同步的实现(2.x 做不到的核心能力)

2.x 的痛点

想同步 100 张表到 StarRocks:
  → 需要写 100 条 Flink SQL(每张表一个 CREATE TABLE Source + Sink + INSERT INTO)
  → 启动 100 个 Flink Job
  → 每个 Job 建立独立的 MySQL 连接、独立消费 Binlog
  → 源库连接数爆炸 🔥

3.0 的解法

一个 Pipeline → 一个 Flink Job → 一个 MySQL 连接 → 读一份 Binlog → 分发给所有表:

MySQL Binlog Stream

    ├─ Event(table=users)      → Transform[users]    → Route → Sink[users]
    ├─ Event(table=orders)     → Transform[orders]   → Route → Sink[orders]
    ├─ Event(table=products)   → Transform[products] → Route → Sink[products]
    └─ ...(N 张表复用同一份 Binlog 流)

底层实现:
  1. Source 内部有一个 Dispatcher
  2. 根据 Event 中的 tableId 分发到对应的下游 Operator Chain
  3. 多表共享同一个 Binlog 连接 + 同一个 Checkpoint
  4. 新增表时自动发现(基于正则匹配 + 定时扫描元数据)

YAML 声明式配置

# flink-cdc-pipeline.yaml
source:
  type: mysql
  hostname: 10.0.0.1
  port: 3306
  username: cdc_user
  password: ******
  tables: mydb.\.*              # 正则,整库所有表
  server-id: 5400-5404          # 多并行度时的 server-id 范围

sink:
  type: starrocks
  jdbc-url: jdbc:mysql://10.0.0.2:9030
  load-url: 10.0.0.2:8030
  username: root
  password: ******
  database-name: ods

transform:
  - source-table: mydb.orders
    projection: "*, amount * 0.8 AS discounted_amount"
    filter: "status <> 'CANCELLED'"

route:
  - source-table: mydb.orders_\d+
    sink-table: ods.orders

pipeline:
  name: MySQL to StarRocks Sync
  parallelism: 4
  schema.change.behavior: evolve   # 自动传播 DDL

启动方式:

bin/flink-cdc.sh mysql-to-starrocks.yaml

一行命令,整库同步,Schema 自动演进。 对比 2.x 需要写 Java 代码 + 部署 Flink 集群 + 逐表配置,体验天差地别。


底层运行时如何编排

YAML 文件

    ▼ Pipeline Composer(编排器)

    ├─ 1. 解析 YAML → 构建 Pipeline Definition
    ├─ 2. 连接源库获取所有匹配的表的 Schema
    ├─ 3. 为每张表生成 Transform + Route 的执行计划
    ├─ 4. 生成 Flink JobGraph:
    │     Source Operator (1个) → Dispatcher → [Transform_1..N] → [Sink_1..N]
    ├─ 5. 注入 Schema Registry(初始 Schema 快照)
    └─ 6. 提交到 Flink 集群执行
    
运行时:
    ├─ Checkpoint 机制保证 Exactly-Once
    ├─ Schema Evolution Handler 监听 DDL 事件
    ├─ Table Discovery Thread 定时扫描新建表
    └─ Metrics 上报延迟、吞吐、Schema 变更次数

全量同步阶段的过滤支持

支持,但有重要限制 —— 过滤不会下推到 Snapshot 查询。

过滤层级配置位置全量阶段生效?是否下推到 Source SELECT
表级过滤source.tables✅ 是✅ 是(只 SELECT 匹配的表)
行级过滤transform.filter✅ 是(但有代价)❌ 不下推

行级过滤

transform:
  - source-table: mydb.orders
    filter: "status = 'PAID' AND amount > 100"   # 只同步已支付且金额>100的
    projection: "id, user_id, amount, status"

这个 filter 在全量阶段也会生效,但工作方式是:

Source Snapshot Reader

    │  SELECT * FROM orders WHERE id BETWEEN 1 AND 10000
    │  (注意!没有 WHERE status='PAID',是全量读取)

    │  读出 10000 行

Transform Operator(Flink 算子层)

    │  逐行判断: status = 'PAID' AND amount > 100 ?
    │  过滤掉 7000 行
    │  只保留 3000 行

Sink Writer
    │  写入 3000 行到 Iceberg

过滤发生在 Flink 算子内部,不是在 Source 的 SQL 查询里。

用scan.snapshot.filters(部分 Source 支持)

Flink CDC 3.0 的 MySQL Source 在某些版本支持:

source:
  type: mysql
  tables: mydb.orders
  scan.snapshot.filters: "status = 'PAID'"   # 下推到 snapshot SELECT

但这个是实验性功能,不是所有版本都有,使用前检查你的 connector 版本。